package com.cloudansys.config;

import com.cloudansys.core.constant.Const;
import com.cloudansys.core.util.TextUtil;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Query;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

@Slf4j
public class DefaultConfig {

    private static Properties props = new Properties();
    private static JedisPool jedisPool = null;
    private static HikariDataSource hkDateSource = null;
    private static InfluxDB influxDB;
    private static MqttClient mqttClient;

    // 初始化
    static {
        try {
            props.load(DefaultConfig.class.getResourceAsStream(getEnv()));
        } catch (IOException e) {
            log.error("获取配置文件失败");
            e.printStackTrace();
        }
        initJedisPool();
        initHkDateSource();
        initInfluxDB();
        initMqtt();
    }

    /**
     * 获取当前环境的配置文件名称
     */
    private static String getEnv() throws IOException {
        Properties prop = new Properties();
        InputStream inputStream = DefaultConfig.class.getClassLoader().getResourceAsStream(Const.ENV_PROP);
        prop.load(inputStream);
        String env = prop.getProperty(Const.ENV);
        String envFile = env + Const.ENV_SUFFIX;
        log.info("运行环境: {}", env);
        return Const.PTN_SLASH + envFile;
    }

    /**
     * 获取 配置文件中某个 key 所对应的 value
     */
    public static String get(String key) {
        return props.getProperty(key);
    }

    /**
     * 获取 pipeline
     *
     * @return pipeline
     */
    public static Pipeline getPipeline() {
        return jedisPool.getResource().pipelined();
    }

    /**
     * 获取Jedis实例
     *
     * @return Jedis
     */
    public static Jedis getJedis() {
        return jedisPool.getResource();
    }

    /**
     * 初始化 JedisPool 配置信息
     */
    private static void initJedisPool() {
        try {
            String host = props.getProperty(Const.REDIS_HOST);
            int dbIndex = Integer.parseInt(props.getProperty(Const.REDIS_DB));
            int port = Integer.parseInt(props.getProperty(Const.REDIS_PORT));
            String password = props.getProperty(Const.REDIS_PASSWORD);
            JedisPoolConfig config = new JedisPoolConfig();
            //最大空闲连接数, 默认8个
            config.setMaxIdle(50);
            //最大连接数, 默认8个
            config.setMaxTotal(1000);
            //获取连接时的最大等待毫秒数
            config.setMaxWaitMillis(30000);
            //在获取连接的时候检查有效性, 默认false
            config.setTestOnBorrow(Boolean.TRUE);
            config.setTestOnReturn(Boolean.TRUE);
            jedisPool = new JedisPool(config, host, port, 30000, password, dbIndex);
        } catch (NumberFormatException e) {
            log.error("jedis 初始化失败");
            e.printStackTrace();
        }
    }

    /**
     * 初始化 influxDB 配置，连接数据库
     */
    private static void initInfluxDB() {
        try {
            String serverURL = props.getProperty(Const.INFLUX_SERVER_URL);
            String database = props.getProperty(Const.INFLUX_DATABASE);
            String username = props.getProperty(Const.INFLUX_USERNAME);
            String password = props.getProperty(Const.INFLUX_PASSWORD);
            influxDB = InfluxDBFactory.connect(serverURL, username, password);
            createDB(database);
            influxDB.setDatabase(database);
            // 批大小（万条）
            int batchSize = 10;
            // 写间隔（ms）
            int interval = 1000;
            influxDB
                    .enableGzip()
                    // 第一个是point的个数，第二个是时间，单位毫秒，第三个时间单位
                    .enableBatch(batchSize * 10000, interval, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.error("InfluxDB 配置初始化失败");
            e.printStackTrace();
        }
    }

    /****
     *  创建数据库，数据库不存在会自动创建，存在则无操作
     */
    private static void createDB(String database) {
        try {
            influxDB.query(new Query("CREATE DATABASE " + database));
        } catch (NumberFormatException e) {
            log.info("### {} 创建失败！###", database);
            log.error("errMsg: {}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 获取 InfluxDB 查询客户端
     * 注意：不要使用这个进行写入操作
     *
     * @return influxDB 实例
     */
    public static InfluxDB getInfluxDB() {
        if (influxDB == null) {
            initInfluxDB();
        }
        return influxDB;
    }

    /**
     * 获取mqtt客户端 ，若不存在则创建
     *
     * @return mqtt 实例
     */
    public static MqttClient getMqttClient() {
        if (mqttClient == null) {
            initMqtt();
        }
        return mqttClient;
    }

    /**
     * mqtt 配置
     */
    private static void initMqtt() {
        MqttConnectOptions options = new MqttConnectOptions();
        String serverURIs = props.getProperty(Const.MQTT_SERVER_URIS);
        String username = props.getProperty(Const.MQTT_USERNAME);
        String password = props.getProperty(Const.MQTT_PASSWORD);
        String clientId = TextUtil.randomUUID(10);
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录
        options.setCleanSession(true);
        options.setConnectionTimeout(30);
        //设置断开后重新连接
        options.setAutomaticReconnect(true);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        // 默认值 10，发布消息很多时会报错：MQTT(32202): 正在发布过多的消息
        options.setMaxInflight(1000);
        try {
            mqttClient = new MqttClient(serverURIs, clientId, new MemoryPersistence());
            mqttClient.connect(options);
        } catch (MqttException e) {
            log.error("Mqtt 初始化失败");
            e.printStackTrace();
        }
    }

    /**
     * RabbitMQ 数据源（Flink）
     */
    public static RMQSource<String> getRMQSource() {
        return new RMQSource<String>(
                DefaultConfig.getRabbitMQConfig(),
                props.getProperty(Const.RABBITMQ_QUEUE_NAME),
                new SimpleStringSchema()) {
            /**
             * 如果 rabbitmq 中的 queue 设置了 ttl
             * 这里要把 queueDeclare 的第二个参数修改成 true，并配置 x-message-ttl
             */
            @Override
            protected void setupQueue() throws IOException {
                if (queueName != null) {
                    Map<String, Object> arguments = new HashMap<>();
                    arguments.put("x-message-ttl", 259200000);
                    channel.queueDeclare(queueName, true, false, false, arguments);
                }
            }
        };
    }

    /**
     * RabbitMQ 配置
     */
    private static RMQConnectionConfig getRabbitMQConfig() {
        String host = props.getProperty(Const.RABBITMQ_HOST);
        int port = Integer.parseInt(props.getProperty(Const.RABBITMQ_PORT));
        String username = props.getProperty(Const.RABBITMQ_USERNAME);
        String password = props.getProperty(Const.RABBITMQ_PASSWORD);
        String virtualHost = props.getProperty(Const.RABBITMQ_VIRTUAL_HOST);
        return new RMQConnectionConfig.Builder()
                .setHost(host)
                .setPort(port)
                .setUserName(username)
                .setPassword(password)
                .setVirtualHost(virtualHost)
                .build();
    }

    /**
     * 获取 mysql 连接
     */
    public static Connection getMysqlConnection() throws SQLException {
        return hkDateSource.getConnection();
    }

    /**
     * MySQL 配置
     * 使用 HikariCP 连接池
     */
    private static void initHkDateSource() {
        try {
            HikariConfig hkConfig = new HikariConfig();
            String url = props.getProperty(Const.MYSQL_URL);
            String username = props.getProperty(Const.MYSQL_USERNAME);
            String password = props.getProperty(Const.MYSQL_PASSWORD);
            hkConfig.setJdbcUrl(url);
            hkConfig.setUsername(username);
            hkConfig.setPassword(password);
            hkConfig.addDataSourceProperty("cachePrepStmts", "true");
            hkConfig.addDataSourceProperty("prepStmtCacheSize", "250");
            hkConfig.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
            hkDateSource = new HikariDataSource(hkConfig);
        } catch (Exception e) {
            log.error("HikariCP 初始化失败");
            e.printStackTrace();
        }
    }

    /**
     * @return 返回任务执行名称
     */
    public static String getJobName() {
        return props.getProperty(Const.APPLICATION_NAME);
    }

    /**
     * 钉钉机器人
     * 获取项目名称
     *
     * @return 返回配置的项目名称
     */
    public static String getProject() {
        return props.getProperty(Const.DING_PROJECT);
    }

    /**
     * 钉钉机器人
     * 获取签名
     *
     * @return 返回签名
     */
    public static String getSign() throws Exception {
        String baseUrl = props.getProperty(Const.DING_BASE_URL);
        String token = props.getProperty(Const.DING_TOKEN);
        String secret = props.getProperty(Const.DING_SECRET);
        long timestamp = System.currentTimeMillis();
        String stringToSign = timestamp + "\n" + secret;
        Mac mac = Mac.getInstance("HmacSHA256");
        mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
        byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
        return baseUrl + token + "&timestamp=" + timestamp + "&sign=" +
                URLEncoder.encode(new String(Base64.encodeBase64(signData)), "UTF-8");
    }

}
